﻿using IOP.Decoder.MQTT;
using IOP.Models.Message.MQTT;
using IOP.Models.Message.MQTT.Package;
using IOP.Models.Net;
using IOP.Models.Queue;
using IOP.Pulsar.Abstractions;
using IOP.Pulsar.MQTT.Abstractions;
using Microsoft.Extensions.Logging;
using System;
using System.Buffers;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;

namespace IOP.Pulsar.MQTT
{
    /// <summary>
    /// MQTT生产线入口
    /// </summary>
    public class MQTTProductLineEntry : IProductLineEntry<MQTTContext>
    {
        /// <summary>
        /// 包队列
        /// </summary>
        public LoopQueue<MQTTContext> Packages { get; set; } = new LoopQueue<MQTTContext>();

        /// <summary>
        /// 生产线
        /// </summary>
        public IProductLine<MQTTContext> ProductLine { get; set; }

        /// <summary>
        /// 日志
        /// </summary>
        private readonly ILogger<MQTTProductLineEntry> _Logger;
        /// <summary>
        /// 主题中心
        /// </summary>
        private readonly ITopicCenter _TopicCenter;
        /// <summary>
        /// 会话中心
        /// </summary>
        private readonly ISessionCenter _SessionCenter;
        /// <summary>
        /// 是否连接
        /// </summary>
        private volatile bool _IsConnect = false;
        /// <summary>
        /// 会话
        /// </summary>
        private Session _Session { get; set; } = null;
        /// <summary>
        /// 链接的Socket
        /// </summary>
        private SocketMonitor _ConnectedSocket { get; set; }
        /// <summary>
        /// 锁
        /// </summary>
        private readonly object _SyncRoot = new object();
        /// <summary>
        /// 自旋锁
        /// </summary>
        private SpinLock _SpinLock = new SpinLock();
        /// <summary>
        /// 是否销毁资源
        /// </summary>
        private volatile bool _IsDispose = false;

        /// <summary>
        /// 构造函数
        /// </summary>
        /// <param name="productLine">生产线</param>
        /// <param name="logger">日志</param>
        /// <param name="center">主题中心接口</param>
        /// <param name="sessionCenter">会话中心</param>
        public MQTTProductLineEntry(IProductLine<MQTTContext> productLine, ILogger<MQTTProductLineEntry> logger, 
            ITopicCenter center, ISessionCenter sessionCenter)
        {
            ProductLine = productLine;
            _TopicCenter = center;
            _Logger = logger;
            _SessionCenter = sessionCenter;
            Packages.OnQueueHasElements += Packages_OnQueueHasElements;
        }

        /// <summary>
        /// 入口函数
        /// </summary>
        /// <param name="queue"></param>
        /// <param name="socket"></param>
        public void EntryHandle(ref ReadOnlySequence<byte> queue, SocketMonitor socket)
        {            
            if(_ConnectedSocket == null) _ConnectedSocket = socket;
            if (_IsDispose) return;
            try
            {
                while (!queue.IsEmpty)
                {
                    var package = MQTTDecoder.Decode(ref queue);
                    if (package != null)
                    {
                        if (package is ConnectPackage)
                        {
                            var connect = (ConnectPackage)package;
                            if (_IsConnect) throw new MQTTConnectException(connect, ReturnCodeType.ClientIdError, $"The MQTT client {connect.ClientIdentifier} is alreadly connected");
                            socket.EnableKeepAliveCheckAndStart(connect.KeepAlive);
                            socket.KeepAliveTimeout += Socket_KeepAliveTimeout;
                            _Session = _SessionCenter.CreateSession(socket, connect);
                            _Session.OnTransmit += Session_OnTransmit;
                            _IsConnect = true;
                        }
                        if (_IsConnect)
                        {
                            MQTTContext context = new MQTTContext(_Session);
                            context.Request = new MQTTRequest(package, socket);
                            context.Response = new MQTTResponse(socket);
                            Packages.Enqueue(context);
                        }
                        else throw new Exception("This mqtt client is not finish connect");
                    }
                    else break;
                }
            }
            catch(MQTTConnectException mqttE)
            {
                _Logger.LogError(mqttE.Message + "\n" + mqttE.StackTrace);
                ConnackPackage connack = new ConnackPackage(false, mqttE.ReturnCode);
                socket.ConnectedSocket.Send(connack.ToBytes());
                socket.ConnectedSocket.Disconnect(false);
            }
            catch (Exception e)
            {
                _Logger.LogError(e.Message + "\n" + e.StackTrace);
                if (socket.ConnectedSocket.Connected)
                {
                    DisconnectPackage package = new DisconnectPackage(0);
                    socket.ConnectedSocket.Send(package.ToBytes());
                    socket.ConnectedSocket.Disconnect(false);
                }
            }
        }

        /// <summary>
        /// 执行转发
        /// </summary>
        /// <param name="obj"></param>
        private void Session_OnTransmit(PublishPackage obj)
        {
            bool voucher = false;
            try
            {
                _SpinLock.Enter(ref voucher);
                if (!_IsDispose)
                {
                    if (_ConnectedSocket.ConnectedSocket.Connected)
                        _ConnectedSocket.ConnectedSocket.Send(obj.ToBytes());
                }
            }
            finally
            {
                if (voucher) _SpinLock.Exit();
            }
        }

        /// <summary>
        /// 队列事件
        /// </summary>
        /// <param name="obj"></param>
        private void Packages_OnQueueHasElements(LoopQueue<MQTTContext> obj)
        {
            while (!obj.IsEmpty)
            {
                obj.Dequene(out MQTTContext context);
                Task.Run(() =>
                {
                    ProductLine.ProductLine?.Invoke(context);
                });
            }
        }

        /// <summary>
        /// Socket保持连接超时处理函数
        /// </summary>
        /// <param name="obj"></param>
        private void Socket_KeepAliveTimeout(SocketMonitor obj)
        {
            _Logger.LogError("This client is time out");
            DisconnectPackage package = new DisconnectPackage(0);
            obj.ConnectedSocket.Send(package.ToBytes());
            obj.ConnectedSocket.Disconnect(false);
        }

        /// <summary>
        /// 释放资源
        /// </summary>
        public void Dispose()
        {
            _IsDispose = true;
            _IsConnect = false;
            if (_Session != null) _Session.OnTransmit -= Session_OnTransmit;
            Packages.OnQueueHasElements -= Packages_OnQueueHasElements;
            _ConnectedSocket.KeepAliveTimeout -= Socket_KeepAliveTimeout;
            _ConnectedSocket.DisableKeepAliveCheck();
            _SessionCenter.DisposeSession(_Session);
            _ConnectedSocket.ConnectedSocket.Shutdown(SocketShutdown.Both);
            _ConnectedSocket.ConnectedSocket.Close();
        }
    }
}
